package com.hrvsr.scheduler.distributed.redisbloomfilter;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Transaction;

public class RedisBloomFilter implements BloomFilter, Closeable {
	// ---------------redis config--------------
	private static final long REDIS_BIT_MAX_LEN = 4294967295L;
	private JedisPool jedisPool;
	private int dbNo;
	private final String keyPrefix;

	private HashFunction[] hashFunctions;
	// private long n;
	// private long m;
	// private double falsePositiveRate = 0.001;

	public RedisBloomFilter(JedisPool jedisPool, int dbNo, String keyPrefix, long n, double falsePositiveRate) {
		super();
		this.jedisPool = jedisPool;
		this.dbNo = dbNo;
		this.keyPrefix = keyPrefix;
		// this.n = n;
		// this.falsePositiveRate = falsePositiveRate;
		int k = getK(falsePositiveRate);
		long m = getM(falsePositiveRate, n);
		// this.m = m;
		this.hashFunctions = genHashFunctions(k, m);
	}

	private long getM(double p, long n) {
		double k = -Math.log(p) / Math.log(2);
		double m = (k * n) / (Math.log(2));
		return (long) Math.ceil(m);
	}

	private HashFunction[] genHashFunctions(int k, long cap) {
		HashFunction[] hashes = new HashFunction[k];
		for (int i = 0; i < hashes.length; i++) {
			hashes[i] = new SimpleHashFunction(i * 10 + 25, cap);
		}
		return hashes;
	}

	private int getK(double p) {
		double rk = -Math.log(p) / Math.log(2);
		double k = Math.ceil(rk);
		return (int) k;
	}

	@Override
	public boolean contains(String charSequence) {
		long[] hashes = getHashes(charSequence);
		return contains(hashes);
	}

	private boolean contains(long[] hashes) {
		Jedis jedis = null;
		try {
			jedis = jedisPool.getResource();
			jedis.select(dbNo);
			Transaction trans = jedis.multi();
			// ArrayList<Response<Boolean>> resps = new
			// ArrayList<Response<Boolean>>(hashes.length);
			for (long i : hashes) {
				long[] xy = getCoordinate(i);
				// Response<Boolean> subResp = trans.getbit(keyPrefix + xy[0],
				// xy[1]);
				trans.getbit(keyPrefix + xy[0], xy[1]);
				// resps.add(subResp);
			}
//			trans.exec();

			List<Object> answers = trans.exec();

			for (Object ans : answers) {
				boolean in = ((Boolean) ans).booleanValue();
				if (!in)
					return false;
			}

			/*
			 * for (Response<Boolean> subResp : resps) { Boolean in =
			 * subResp.get(); if (!in) return false; }
			 */
			return true;

		} finally {
			if (jedis != null)
				jedis.close();
		}
	}

	private long[] getCoordinate(long i) {
		long x = (long) Math.floor(i / REDIS_BIT_MAX_LEN);
		long y = i % REDIS_BIT_MAX_LEN;
		return new long[] { x, y };
	}

	@Override
	public void add(String charSequence) {
		Jedis jedis = null;
		try {
			jedis = jedisPool.getResource();
			jedis.select(dbNo);
			long[] hashes = getHashes(charSequence);
			Transaction trans = jedis.multi();
			for (long i : hashes) {
				long[] xy = getCoordinate(i);
				trans.setbit(keyPrefix + xy[0], xy[1], true);
			}
			trans.exec();
		} finally {
			if (jedis != null)
				jedis.close();
		}

	}

	private long[] getHashes(String charSequence) {
		long[] res = new long[hashFunctions.length];
		for (int i = 0; i < hashFunctions.length; i++) {
			HashFunction f = hashFunctions[i];
			long n = f.hash(charSequence);
			res[i] = n;
		}
		return res;
	}

	@Override
	public void close() throws IOException {
		jedisPool.close();
	}

}
